{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Copyright 2020 NVIDIA Corporation. All Rights Reserved.\n",
    "#\n",
    "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "# you may not use this file except in compliance with the License.\n",
    "# You may obtain a copy of the License at\n",
    "#\n",
    "#     http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing, software\n",
    "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "# See the License for the specific language governing permissions and\n",
    "# limitations under the License.\n",
    "# =============================================================================="
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Criteo Example \n",
    "Here we'll show how to use NVTabular first as a preprocessing library to prepare the [Criteo Display Advertising Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge) dataset, and then as a dataloader to train a FastAI model on the prepared data. The large memory footprint of the Criteo dataset presents a great opportunity to highlight the advantages of the online fashion in which NVTabular loads and transforms data.\n",
    "\n",
    "### Data Prep\n",
    "Before we get started, make sure you've run the [optimize_criteo](./optimize_criteo.ipynb) notebook, which will convert the tsv data published by Criteo into the parquet format that our accelerated readers prefer. It's fair to mention at this point that that notebook will take around 30 minutes to run. While we're hoping to release accelerated csv readers in the near future, we also believe that inefficiencies in existing data representations like csv are in no small part a consequence of inefficiencies in the existing hardware/software stack. Accelerating these pipelines on new hardware like GPUs may require us to make new choices about the representations we use to store that data, and parquet represents a strong alternative."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from time import time\n",
    "import re\n",
    "import glob\n",
    "import warnings\n",
    "\n",
    "# tools for data preproc/loading\n",
    "import torch\n",
    "import rmm\n",
    "import nvtabular as nvt\n",
    "from nvtabular.ops import Normalize,  Categorify,  LogOp, FillMissing, Clip, get_embedding_sizes\n",
    "from nvtabular.loader.torch import TorchAsyncItr, DLDataLoader\n",
    "from nvtabular.utils import device_mem_size\n",
    "\n",
    "# tools for training\n",
    "from fastai.basics import Learner\n",
    "from fastai.tabular.model import TabularModel\n",
    "from fastai.tabular.data import TabularDataLoaders\n",
    "from fastai.metrics import accuracy\n",
    "from fastai.callback.progress import ProgressCallback"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Initializing the Memory Pool\n",
    "For applications like the one that follows where RAPIDS will be the only workhorse user of GPU memory and resource, a best practice is to use the RAPIDS Memory Manager library `rmm` to allocate a dedicated pool of GPU memory that allows for fast, asynchronous memory management. Here, we'll dedicate 80% of free GPU memory to this pool to make sure we get the most utilization possible."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/opt/conda/envs/rapids/lib/python3.7/site-packages/nvtabular/utils.py:47: UserWarning: get_memory_info is not supported. Using total device memory from NVML.\n",
      "  warnings.warn(\"get_memory_info is not supported. Using total device memory from NVML.\")\n"
     ]
    }
   ],
   "source": [
    "rmm.reinitialize(pool_allocator=True, initial_pool_size=0.8 * device_mem_size(kind='free'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Dataset and Dataset Schema\n",
    "Once our data is ready, we'll define some high level parameters to describe where our data is and what it \"looks like\" at a high level."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# define some information about where to get our data\n",
    "INPUT_DATA_DIR = os.environ.get('INPUT_DATA_DIR', '/raid/criteo/')\n",
    "OUTPUT_DATA_DIR = os.environ.get('OUTPUT_DATA_DIR', '/raid/test_dask') # where we'll save our procesed data to\n",
    "BATCH_SIZE = int(os.environ.get('BATCH_SIZE', 800000))\n",
    "PARTS_PER_CHUNK = int(os.environ.get('PARTS_PER_CHUNK', 2))\n",
    "NUM_TRAIN_DAYS = 23 # number of days worth of data to use for training, the rest will be used for validation\n",
    "\n",
    "# define our dataset schema\n",
    "CONTINUOUS_COLUMNS = ['I' + str(x) for x in range(1,14)]\n",
    "CATEGORICAL_COLUMNS =  ['C' + str(x) for x in range(1,27)]\n",
    "LABEL_COLUMNS = ['label']\n",
    "COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS + LABEL_COLUMNS"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "_metadata\tday_12.parquet\tday_17.parquet\tday_21.parquet\tday_5.parquet\r\n",
      "day_0.parquet\tday_13.parquet\tday_18.parquet\tday_22.parquet\tday_6.parquet\r\n",
      "day_1.parquet\tday_14.parquet\tday_19.parquet\tday_23.parquet\tday_7.parquet\r\n",
      "day_10.parquet\tday_15.parquet\tday_2.parquet\tday_3.parquet\tday_8.parquet\r\n",
      "day_11.parquet\tday_16.parquet\tday_20.parquet\tday_4.parquet\tday_9.parquet\r\n"
     ]
    }
   ],
   "source": [
    "! ls $INPUT_DATA_DIR"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "fname = 'day_{}.parquet'\n",
    "num_days = len([i for i in os.listdir(INPUT_DATA_DIR) if re.match(fname.format('[0-9]{1,2}'), i) is not None])\n",
    "train_paths = [os.path.join(INPUT_DATA_DIR, fname.format(day)) for day in range(NUM_TRAIN_DAYS)]\n",
    "valid_paths = [os.path.join(INPUT_DATA_DIR, fname.format(day)) for day in range(NUM_TRAIN_DAYS, num_days)]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Preprocessing\n",
    "At this point, our data still isn't in a form that's ideal for consumption by neural networks. The most pressing issues are missing values and the fact that our categorical variables are still represented by random, discrete identifiers, and need to be transformed into contiguous indices that can be leveraged by a learned embedding. Less pressing, but still important for learning dynamics, are the distributions of our continuous variables, which are distributed across multiple orders of magnitude and are uncentered (i.e. E[x] != 0).\n",
    "\n",
    "We can fix these issues in a conscise and GPU-accelerated manner with an NVTabular `Workflow`. We'll instantiate one with our current dataset schema, then symbolically add operations _on_ that schema. By setting all these `Ops` to use `replace=True`, the schema itself will remain unmodified, while the variables represented by each field in the schema will be transformed.\n",
    "\n",
    "#### Frequency Thresholding\n",
    "One interesting thing worth pointing out is that we're using _frequency thresholding_ in our `Categorify` op. This handy functionality will map all categories which occur in the dataset with some threshold level of infrequency (which we've set here to be 15 occurrences throughout the dataset) to the _same_ index, keeping the model from overfitting to sparse signals."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "proc = nvt.Workflow(\n",
    "    cat_names=CATEGORICAL_COLUMNS,\n",
    "    cont_names=CONTINUOUS_COLUMNS,\n",
    "    label_name=LABEL_COLUMNS)\n",
    "\n",
    "# log -> normalize continuous features. Note that doing this in the opposite\n",
    "# order wouldn't make sense! Note also that we're zero filling continuous\n",
    "# values before the log: this is a good time to remember that LogOp\n",
    "# performs log(1+x), not log(x)\n",
    "proc.add_cont_feature([FillMissing(), Clip(min_value=0), LogOp()])\n",
    "proc.add_cont_preprocess(Normalize())\n",
    "\n",
    "# categorification with frequency thresholding\n",
    "proc.add_cat_preprocess(Categorify(freq_threshold=15, out_path=OUTPUT_DATA_DIR))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now instantiate dataset iterators to loop through our dataset (which we couldn't fit into GPU memory)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_dataset = nvt.Dataset(train_paths, engine='parquet', part_mem_fraction=0.15)\n",
    "valid_dataset = nvt.Dataset(valid_paths, engine='parquet', part_mem_fraction=0.15)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now run them through our workflows to collect statistics on the train set, then transform and save to parquet files."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "output_train_dir = os.path.join(OUTPUT_DATA_DIR, 'train/')\n",
    "output_valid_dir = os.path.join(OUTPUT_DATA_DIR, 'valid/')\n",
    "! mkdir -p $output_train_dir\n",
    "! mkdir -p $output_valid_dir"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For reference, let's time it to see how long it takes..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 11min 40s, sys: 13min 36s, total: 25min 17s\n",
      "Wall time: 26min 26s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "proc.apply(train_dataset, shuffle=nvt.io.Shuffle.PER_PARTITION, output_path=output_train_dir, out_files_per_proc=5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 18.1 s, sys: 40.2 s, total: 58.3 s\n",
      "Wall time: 1min 1s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "proc.apply(valid_dataset, record_stats=False, shuffle=nvt.io.Shuffle.PER_PARTITION, output_path=output_valid_dir, out_files_per_proc=5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And just like that, we have training and validation sets ready to feed to a model!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Deep Learning\n",
    "### Data Loading\n",
    "We'll start by using the parquet files we just created to feed an NVTabular `TorchAsyncItr`, which will loop through the files in chunks. First, we'll reinitialize our memory pool from earlier to free up some memory so that we can share it with PyTorch."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/opt/conda/envs/rapids/lib/python3.7/site-packages/nvtabular/utils.py:47: UserWarning: get_memory_info is not supported. Using total device memory from NVML.\n",
      "  warnings.warn(\"get_memory_info is not supported. Using total device memory from NVML.\")\n"
     ]
    }
   ],
   "source": [
    "rmm.reinitialize(pool_allocator=True, initial_pool_size=0.3 * device_mem_size(kind='free'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_paths = glob.glob(os.path.join(output_train_dir, \"*.parquet\"))\n",
    "valid_paths = glob.glob(os.path.join(output_valid_dir, \"*.parquet\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_data = nvt.Dataset(train_paths, engine=\"parquet\", part_mem_fraction=0.04/PARTS_PER_CHUNK)\n",
    "valid_data = nvt.Dataset(valid_paths, engine=\"parquet\", part_mem_fraction=0.04/PARTS_PER_CHUNK)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_data_itrs = TorchAsyncItr(\n",
    "    train_data,\n",
    "    batch_size=BATCH_SIZE,\n",
    "    cats=CATEGORICAL_COLUMNS,\n",
    "    conts=CONTINUOUS_COLUMNS,\n",
    "    labels=LABEL_COLUMNS,\n",
    "    parts_per_chunk=PARTS_PER_CHUNK\n",
    ")\n",
    "valid_data_itrs = TorchAsyncItr(\n",
    "    valid_data,\n",
    "    batch_size=BATCH_SIZE,\n",
    "    cats=CATEGORICAL_COLUMNS,\n",
    "    conts=CONTINUOUS_COLUMNS,\n",
    "    labels=LABEL_COLUMNS,\n",
    "    parts_per_chunk=PARTS_PER_CHUNK\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "def gen_col(batch):\n",
    "    return (batch[0], batch[1], batch[2].long())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_dataloader = DLDataLoader(train_data_itrs, collate_fn=gen_col, batch_size=None, pin_memory=False, num_workers=0)\n",
    "valid_dataloader = DLDataLoader(valid_data_itrs, collate_fn=gen_col, batch_size=None, pin_memory=False, num_workers=0)\n",
    "databunch = TabularDataLoaders(train_dataloader, valid_dataloader)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we have data ready to be fed to our model online!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Training\n",
    "One extra handy functionality of NVTabular is the ability to use the stats collected by the `Categorify` op to define embedding dictionary sizes (i.e. the number of rows of your embedding table). It even includes a heuristic for computing a good embedding size (i.e. the number of columns of your embedding table) based off of the number of categories."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "embeddings = list(get_embedding_sizes(proc).values())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "model = TabularModel(emb_szs=embeddings, n_cont=len(CONTINUOUS_COLUMNS), out_sz=2, layers=[512, 256]).cuda()\n",
    "learn =  Learner(databunch, model, loss_func = torch.nn.CrossEntropyLoss(), metrics=[accuracy], cbs=ProgressCallback())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "from fastai.callback.schedule import fit_one_cycle"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[0, 0.12467262893915176, 0.12468979507684708, 0.9666252732276917, '2:26:41']\n",
      "8801.142189741135\n"
     ]
    }
   ],
   "source": [
    "learning_rate = 1.32e-2\n",
    "epochs = 1\n",
    "start = time()\n",
    "#learn.fit(epochs, learning_rate)\n",
    "fit_one_cycle(learn, n_epoch=epochs, lr_max=learning_rate)\n",
    "t_final = time() - start\n",
    "print(t_final)"
   ]
  }
 ],
 "metadata": {
  "file_extension": ".py",
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.8"
  },
  "mimetype": "text/x-python",
  "name": "python",
  "npconvert_exporter": "python",
  "pygments_lexer": "ipython3",
  "version": 3
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
